kafka常用操作:
查看topic列表:
1.kafka-topics.sh –list –zookeeper 10.1.14.39:2181,10.1.14.40:2181,10.1.14.41:2181
创建topic:
2.kafka-topics.sh –create –zookeeper 10.1.14.39:2181,10.1.14.40:2181,10.1.14.41:2181 –replication-factor 3 –partitions 1 –topic my-replicated-to
查看topic详细信息
3.kafka-topics.sh –describe –zookeeper 10.1.14.39:2181,10.1.14.40:2181,10.1.14.41:2181 –topic
生产消息:
./kafka-console-producer.sh –broker-list 10.1.14.39:9092,10.1.14.40:9092,10.1.14.41:9092 –topic nglogs
消费消息
./kafka-console-consumer.sh –bootstrap-server 10.1.14.39:9092,10.1.14.40:9092,10.1.14.41:9092 –from-beginning –topic nglogs
zookeeper相关命令:
bin/zkServer.sh stop
bin/zkServer.sh start
bin/zkServer.sh status
./zkCli.sh -server 10.1.14.41:2181
#see all topics
ls /brokers/topics
ls /
#broker info
ls /brokers/ids
get /brokers/ids/0
get /controller_epoch
#check partition
get /brokers/topics/my-replicated-topic
get /brokers/topics/nglogs/partitions/state
#add partions
ls /brokers/topics/nglogs/partitions
#修改分区
bin/kafka-topics.sh –alter –zookeeper 10.1.14.39:2181,10.1.14.40:2181,10.1.14.41:2181 –partitions 3 –topic nglogs
测试脚本:
consumer.py
1 2 3 4 |
from kafka import KafkaConsumer consumer = KafkaConsumer('test',bootstrap_servers=['10.1.0.39:9092','10.1.0.45:9092','10.1.0.208:9092']) for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key, message.value)) |
producter.py
1 2 3 4 5 6 |
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['10.1.0.39:9092','10.1.0.45:9092','10.1.0.208:9092']) for i in range(3): msg = 'hello world' producer.send('test', msg) producer.close() |